Nacos Client原理

Nacos客户端需要引入spring-cloud-starter-alibaba-nacos-discovery依赖。

1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

在配置文件中加上注册中心地址:

1
2
spring.application.name=mall-user  #微服务名称
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848 #配置nacos注册中心地址

服务注册

NacosAutoServiceRegistration中继承了AbstractAutoServiceRegistration,而该抽象类实现了ApplicationListener监听器。在容器启动时通过WebServerStartStopLifecyclestart()发布ServletWebServerInitializedEvent事件。最终通过调用NacosServiceRegistryregister来完成服务注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
public void start() {
if (!isEnabled()) {
return;
}
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
register(); // 调用子类NacosAutoServiceRegistration的register方法
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
protected void register() {
this.serviceRegistry.register(getRegistration());
}
}
public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {
protected void register() {
if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
return;
}
if (this.registration.getPort() < 0) {
this.registration.setPort(getPort().get());
}
super.register(); // 调用超类AbstractAutoServiceRegistration的register
}
}

将当前实例封装为一个Instance对象,最终通过调用NamingServiceregisterInstance来完成服务注册。serviceId默认为应用名称group默认为DEFAULT_GROUPephemeral用于决定是使用AP模式还是CP模式默认为trueAP模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class NacosServiceRegistry implements ServiceRegistry<Registration> {
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
return;
}
// 从缓存中获取,若NacosNamingService不存在,则通过通过NacosDiscoveryProperties反射创建实例
NamingService namingService = namingService();
String serviceId = registration.getServiceId(); // 默认是应用名称
String group = nacosDiscoveryProperties.getGroup(); // 默认为DEFAULT_GROUP
Instance instance = getNacosInstanceFromRegistration(registration); // 创建应用实例
try { // 调用NacosNamingService的registerInstance方法
namingService.registerInstance(serviceId, group, instance);
} catch (Exception e) {
rethrowRuntimeException(e);
}
}
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost()); // 当前实例应用机器IP
instance.setPort(registration.getPort()); // 当前实例应用端口
instance.setWeight(nacosDiscoveryProperties.getWeight()); // 权重默认为1.0
instance.setClusterName(nacosDiscoveryProperties.getClusterName()); // 默认DEFAULT
instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled()); // 实例是否可用,默认true
instance.setMetadata(registration.getMetadata()); // 默认preserved.register.source = SPRING_CLOUD
instance.setEphemeral(nacosDiscoveryProperties.isEphemeral()); // 是否为临时实例即AP模式:默认true
return instance;
}
}

对于NamingService的实现类NacosNamingService的实例化,其实是通过NacosWatch中实现SmartLifecyclestart()方法,在refresh()中的finishRefresh()中调用getLifecycleProcessor().onRefresh()即调用Lifecyclestart()来完成的。从而完成了NamingService的实例化。首先创建一个NamingEvent事件监听器注册到InstancesChangeNotifierlistenerMap中,当发送数据变更时执行该监听事件。UpdateTask定时任务会每10s检查一次服务是否变更。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle {
public void start() {
if (this.running.compareAndSet(false, true)) {
EventListener eventListener = listenerMap.computeIfAbsent(buildKey(), event -> new EventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
List<Instance> instances = ((NamingEvent) event).getInstances();
// 从实例列表通过IP和端口中找到当前实例
Optional<Instance> instanceOptional = selectCurrentInstance(instances);
instanceOptional.ifPresent(currentInstance -> {
resetIfNeeded(currentInstance); // 若实例存在则重置实例metadata数据
});
}
}
});
// 从缓存中获取,若NacosNamingService不存在,则通过通过NacosDiscoveryProperties反射创建实例
NamingService namingService = nacosServiceManager.getNamingService(properties.getNacosProperties());
try { // 将上面创建的NamingEvent事件监听器注册到InstancesChangeNotifier的listenerMap中,当发送数据变更时执行该监听事件
namingService.subscribe(properties.getService(), properties.getGroup(), Arrays.asList(properties.getClusterName()), eventListener);
} catch (Exception e) {
}
// 30s后发布HeartbeatEvent事件
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(this::nacosServicesWatch, this.properties.getWatchDelay());
}
}
public void nacosServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent(new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
}
public class NacosServiceManager {
public NamingService getNamingService(Properties properties) {
if (Objects.isNull(this.namingService)) {
buildNamingService(properties);
}
return namingService;
}
private NamingService buildNamingService(Properties properties) {
if (Objects.isNull(namingService)) {
synchronized (NacosServiceManager.class) {
if (Objects.isNull(namingService)) {
namingService = createNewNamingService(properties);
}
}
}
return namingService;
}
private NamingService createNewNamingService(Properties properties) {
try {
return createNamingService(properties);
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
public static NamingService createNamingService(Properties properties) throws NacosException {
return NamingFactory.createNamingService(properties);
}
public static NamingService createNamingService(Properties properties) throws NacosException {
try { // 通过反射创建NacosNamingService实例
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
NamingService vendorImpl = (NamingService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
}
public class NacosNamingService implements NamingService {
// NacosServiceManager中通过反射调用NacosNamingService该实例方法实例化NacosNamingService
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
initServerAddr(properties);
InitUtils.initWebRootContext(properties);
initCacheDir();
initLogName(properties);
this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties),
isPushEmptyProtect(properties), initPollingThreadCount(properties));
}
}
public class NamingProxy implements Closeable {
public NamingProxy(String namespaceId, String endpoint, String serverList, Properties properties) {
this.securityProxy = new SecurityProxy(properties, nacosRestTemplate);
this.properties = properties;
this.setServerPort(DEFAULT_SERVER_PORT);
this.namespaceId = namespaceId;
this.endpoint = endpoint;
this.maxRetry = ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_REQUEST_DOMAIN_RETRY_COUNT,
String.valueOf(UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT)));
if (StringUtils.isNotEmpty(serverList)) {
this.serverList = Arrays.asList(serverList.split(","));
if (this.serverList.size() == 1) {
this.nacosDomain = serverList;
}
}
this.initRefreshTask();
}
private void initRefreshTask() {
this.executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.naming.updater");
t.setDaemon(true);
return t;
}
});
this.executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() { // 默认30s执行一次
refreshSrvIfNeed(); // 刷新服务列表
}
}, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
this.executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
securityProxy.login(getServerList());
}
}, 0, securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
refreshSrvIfNeed();
this.securityProxy.login(getServerList());
}
private void refreshSrvIfNeed() {
try {
if (!CollectionUtils.isEmpty(serverList)) {
return; // 若服务列表不为null则返回
}
if (System.currentTimeMillis() - lastSrvRefTime < vipSrvRefInterMillis) {
return;
}
List<String> list = getServerListFromEndpoint();
serversFromEndpoint = list;
lastSrvRefTime = System.currentTimeMillis();
} catch (Throwable e) {
}
}
}
public class HostReactor implements Closeable {
public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, boolean pushEmptyProtection, int pollingThreadCount) {
// init executorService
this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
this.beatReactor = beatReactor;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
this.pushEmptyProtection = pushEmptyProtection;
this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushReceiver = new PushReceiver(this);
this.notifier = new InstancesChangeNotifier(); // 实例化一个实例变更通知
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384); // 发布一个实例变更事件
NotifyCenter.registerSubscriber(notifier); // 注册为InstancesChange事件订阅者
}
}

UpdateTask定时任务检查到有服务变更时通过调用InstancesChangeNotifieronEvent事件从而调用NacosWatch中注册的NamingEvent事件监听器,从而完成服务信息更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();
public void registerListener(String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(serviceName, clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
synchronized (lock) {
eventListeners = listenerMap.get(key);
if (eventListeners == null) {
eventListeners = new ConcurrentHashSet<EventListener>();
listenerMap.put(key, eventListeners);
}
}
}
eventListeners.add(listener);
}
public void onEvent(InstancesChangeEvent event) {
String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters());
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (CollectionUtils.isEmpty(eventListeners)) {
return;
}
for (final EventListener listener : eventListeners) {
final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(new Runnable() {
@Override
public void run() {
listener.onEvent(namingEvent);
}
});
continue;
}
listener.onEvent(namingEvent);
}
}
}

若当前是一个ephemeral实例,则会通过BeatReactor#addBeatInfo添加心跳检测,然后通过NamingProxy调用Nacos服务端的HTTP接口/v1/ns/instance完成服务注册。若是AP模式则通过BeatReactoraddBeatInfo方法添加心跳定时任务默认每5s执行一次,向服务端发送心跳信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class NacosNamingService implements NamingService {
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) { // 若是AP模式
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); // 构建心跳信息
beatReactor.addBeatInfo(groupedServiceName, beatInfo); // 添加心跳定时任务每5s执行一次
}
serverProxy.registerService(groupedServiceName, groupName, instance); // 将服务实例注册到服务端
}
}
public class NamingProxy implements Closeable {
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); // 调用/v1/ns/instance接口
}
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
return reqApi(api, params, Collections.EMPTY_MAP, method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException {
return reqApi(api, params, body, getServerList(), method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
if (StringUtils.isNotBlank(nacosDomain)) {
for (int i = 0; i < maxRetry; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
}
index = (index + 1) % servers.size();
}
}
throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
}

健康检查心跳

默认心跳检测时间为5s,通过ScheduledExecutorService创建延时任务5秒后执行BeatTask的run方法,其实现了Runnable接口。且若服务发送变更,停掉原来的心跳检测任务,重新注册一个新的心跳任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class BeatReactor implements Closeable {
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(groupedServiceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
if ((existBeat = dom2Beat.remove(key)) != null) { // 若服务发送变更,停掉原来的心跳检测任务,重新注册一个新的心跳任务
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
}
public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
public long getInstanceHeartBeatInterval() {
return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, Constants.DEFAULT_HEART_BEAT_INTERVAL);
}

首先通过NamingProxy向Nacos服务端HTTP接口/v1/ns/instance/beat发送心跳请求,若当前实例未被注册,服务端会返回code为20404,会再次向服务端发送注册实例请求HTTP请求,当心跳发送完成后通过递归的方式再次向ScheduledExecutorService延时线程池中添加当前心跳任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class BeatTask implements Runnable {
public void run() {
if (beatInfo.isStopped()) {
return; // 若心跳任务被停止则直接结束,且不在重复注册
}
long nextTime = beatInfo.getPeriod();
try {// 向服务端发送心跳请求
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { // 若未找到服务则重新注册到服务端
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try { // 注册实例到Nacos注册中心
serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
}
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}

获取调用服务列表

在通过Ribbon实现客户端负载均衡时,在RibbonClientConfiguration中注册了ILoadBalancer,当第一次掉到目标服务时,初始化LoadBalancer时调用了DynamicServerListLoadBalancerupdateListOfServers方法,最终通过NacosServerList获取Nacos服务中注册的实例列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater);
}
}
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
}
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections().primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
}
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}
}

最终调用NacosNamingServiceselectInstances,这里会通过HostReactor注册一个服务信息更新心跳任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class NacosServerList extends AbstractServerList<NacosServer> {
public List<NacosServer> getUpdatedListOfServers() {
return getServers();
}
private List<NacosServer> getServers() {
try {
String group = discoveryProperties.getGroup();
List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);
return instancesToServerList(instances);
} catch (Exception e) {
throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + serviceId, e);
}
}
}
public class NacosNamingService implements NamingService {
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
return selectInstances(serviceName, groupName, healthy, true);
}
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {

ServiceInfo serviceInfo;
if (subscribe) { // subscribe为true
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
}

服务列表拉取心跳

若当前服务不存在则通过updateServiceNow调用updateService,最终调用NamingProxyqueryList调用Nacos服务端的HTTP接口/v1/ns/instance/list获取服务列表。最终将更新心跳任务UpdateTask添加到ScheduledExecutorService延时任务线程池中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class HostReactor implements Closeable {
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) { // 若通过服务名称从缓存列表中未获取到信息
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) { // 若服务正在更新
if (UPDATE_HOLD_INTERVAL > 0) {
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
}
}
}
}
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
private static final long DEFAULT_DELAY = 1000L;
public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
}

心跳更新任务会10s向服务端发送一次请求,获取当前用到的实例列表最新信息,最终通过递归的方式不断的发送心跳。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class UpdateTask implements Runnable {
public void run() {
long delayTime = DEFAULT_DELAY;
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
updateService(serviceName, clusters);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
refreshOnly(serviceName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!notifier.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
resetFailCount();
} catch (Throwable e) {
incFailCount();
} finally {
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
}
public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}

这里会将请求回的服务列表解析,并与旧的列表对表,若有变更则发布变更事件InstancesChangeEvent且将新的数据写到磁盘中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class HostReactor implements Closeable {
public ServiceInfo processServiceJson(String json) {
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (pushEmptyProtection && !serviceInfo.validate()) { //empty or error push, just ignore
return oldService;
}
boolean changed = false;
if (oldService != null) { // 一般正常逻辑oldService不为null
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 将新获取到的serviceInfo放入serviceInfoMap缓存中
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) { // 旧实例列表
oldHostMap.put(host.toInetAddr(), host);
}
Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
for (Instance host : serviceInfo.getHosts()) { // 新实例列表
newHostMap.put(host.toInetAddr(), host);
}
Set<Instance> modHosts = new HashSet<Instance>();
Set<Instance> newHosts = new HashSet<Instance>();
Set<Instance> remvHosts = new HashSet<Instance>();
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
String key = entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) {
modHosts.add(host); // 被修改实例:新实例ip+port在旧实例列表中,但实例不相等
continue;
}
if (!oldHostMap.containsKey(key)) {
newHosts.add(host); // 新增实例:旧实例列表不存在
}
}
for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
Instance host = entry.getValue();
String key = entry.getKey();
if (newHostMap.containsKey(key)) {
continue;
}
if (!newHostMap.containsKey(key)) {
remvHosts.add(host); // 被删除的实例列表:新实例列表中不包含的旧实例
}
}
if (newHosts.size() > 0) { // 存在新增实例将changed置为true,输出相应日志
changed = true;
}
if (remvHosts.size() > 0) { // 存在删除实例将changed置为true,输出相应日志
changed = true;
}
if (modHosts.size() > 0) { // 存在修改实例将changed置为true,输出相应日志
changed = true;
updateBeatInfo(modHosts);
}
serviceInfo.setJsonFromServer(json);
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
} else {
changed = true;
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir); // 写入磁盘缓存
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
return serviceInfo;
}
private void updateBeatInfo(Set<Instance> modHosts) {
for (Instance instance : modHosts) {
String key = beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort());
// 若变更的实例时当前实例则停止原来的心跳任务重新注册心跳任务
if (beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(instance);
beatReactor.addBeatInfo(instance.getServiceName(), beatInfo);
}
}
}
}